大数据Spark生态系统 修仙之道 Spark Blog

2019-05-15 Docs Language:简体中文 & English Programing Spark Website:www.geekparkhub.com OpenSource GitHub repo size in bytes GeekDeveloper:JEEP-711 Github:github.com/geekparkhub Gitee:gitee.com/geekparkhub

🐘 Spark Technology 修仙之道 金仙道果 🐘

Alt text


🔥 1. Spark 基础 🔥

1.1 Spark 概述

1.1.1 Spark 模块

enter image description here

1.1.2 Spark 特点

1.1.3 Spark 应用场景

1.2 Spark 部署

解压spark-2.1.1-bin-hadoop2.7.tgz

[root@systemhub511 software]# tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/

重命名spark-2.1.1-bin-hadoop2.7

[root@systemhub511 module]# mv spark-2.1.1-bin-hadoop2.7/ spark

1.3 Spark 运行模式

💥 1.3.1 Loacl Mode 💥

1.3.1.1 Loacl Mode 概述
1.3.1.2 (求π) & (WordCount) & (本地调试) 官方案例
bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--executor-memory 1G \
--total-executor-cores 1 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
[root@systemhub511 spark]# bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --executor-memory 1G \
> --total-executor-cores 1 \
> ./examples/jars/spark-examples_2.11-2.1.1.jar \
> 100
INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 3.059446 s
Pi is roughly 3.1411463141146316
[root@systemhub511 spark]# bin/spark-shell
Spark context Web UI available at http://systemhub511:4040
Spark context available as 'sc' (master = local[*], app id = local-1558677071165).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

enter image description here

[root@systemhub511 spark]# mkdir -p input/wordcount
[root@systemhub511 spark]# cd input/wordcount/
[root@systemhub511 wordcount]# vim wordcount_001.txt
hadoop spark hive
hadoop spark hadoop
hbase flume hive
scala java oozie
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((scala,1), (spark,2), (hive,2), (hadoop,3), (oozie,1), (flume,1), (java,1), (hbase,1))
scala>
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("./output/wordcount/")
[root@systemhub511 spark]# cd output/wordcount/
[root@systemhub511 wordcount]# ll
total 4
-rw-r--r--. 1 root root 79 May 24 14:48 part-00000
-rw-r--r--. 1 root root 0 May 24 14:48 _SUCCESS
[root@systemhub511 wordcount]# cat part-00000
(scala,1)
(spark,2)
(hive,2)
(hadoop,3)
(oozie,1)
(flume,1)
(java,1)
(hbase,1)
[root@systemhub511 wordcount]#
1.3.1.3 提交流程

enter image description here

1.3.1.4 数据流程
参数列表 参数描述
textFile("input") 读取本地文件input文件夹数据
flatMap(_.split(" ")) 压平操作,按照空格分割符将一行数据映射成一个个单词
map((_,1)) 对每一个元素操作,将单词映射为元组
reduceByKey(_+_) 按照key将值进行聚合相加
collect 将数据收集到Driver端展示

enter image description here

💥 1.3.2 Standalone Mode 💥

1.3.2.1 Standalone Mode 概述

enter image description here

1.3.2.2 StandaloneMode QuickStart
[root@systemhub511 spark]# cd conf/
[root@systemhub511 conf]# mv slaves.template slaves
[root@systemhub511 conf]# mv spark-env.sh.template spark-env.sh
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A Spark Worker will be started on each of the machines listed below.
systemhub511
systemhub611
systemhub711
# Options for the daemons used in the standalone deploy mode
SPARK_MASTER_HOST=systemhub511
SPARK_MASTER_PORT=7077
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
[root@systemhub511 spark]# sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-systemhub511.out
systemhub711: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub711.out
systemhub611: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub611.out
systemhub511: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub511.out
[root@systemhub511 spark]#
[root@systemhub511 spark]# jps.sh
================ root@systemhub511 All Processes ===========
30651 org.apache.spark.deploy.worker.Worker
30443 org.apache.spark.deploy.master.Master
813 sun.tools.jps.Jps
================ root@systemhub611 All Processes ===========
10369 org.apache.spark.deploy.worker.Worker
11777 sun.tools.jps.Jps
================ root@systemhub711 All Processes ===========
8960 org.apache.spark.deploy.worker.Worker
10364 sun.tools.jps.Jps
[root@systemhub511 spark]#
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://systemhub511:7077 \
--executor-memory 1G \
--total-executor-cores 1 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
[root@systemhub511 spark]# bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master spark://systemhub511:7077 \
> --executor-memory 1G \
> --total-executor-cores 1 \
> ./examples/jars/spark-examples_2.11-2.1.1.jar \
> 100
INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 6.478381 s
Pi is roughly 3.1405883140588315
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077
Spark context Web UI available at http://systemhub511:4040
Spark context available as 'sc' (master = spark://systemhub511:7077, app id = app-20190524174512-0001).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((scala,1), (hive,2), (oozie,1), (java,1), (spark,2), (hadoop,3), (flume,1), (hbase,1))
scala>

enter image description here

[root@systemhub511 conf]# mv spark-defaults.conf.template spark-defaults.conf
spark.master spark://systemhub511:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://systemhub511:9000/directory
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://systemhub511:9000/directory"
spark.eventLog.dir:Application在运行过程中所有信息均记录在该属性指定的路径下.
spark.history.ui.port=18080 WEBUI访问端口号为18080
spark.history.fs.logDirectory=hdfs://systemhub511:9000/directory 配置了该属性后,在start-history-server.sh时就无需再显示指定路径,Spark History Server只展示该指定路径下信息.
spark.history.retainedApplications=30 指定保存Application历史记录个数,如果超过这个值,旧应用程序信息将被删除,这个是内存中应用数,而不是页面上显示应用数.
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
[root@systemhub511 hadoop]# sbin/start-dfs.sh
[root@systemhub511 spark]# hadoop fs -mkdir /directory
[root@systemhub511 spark]# sbin/start-all.sh
[root@systemhub511 spark]# sbin/start-history-server.sh
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077
sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

enter image description here

1.3.2.3 Spark HA 高可用

enter image description here

# SPARK_MASTER_HOST=systemhub511
# SPARK_MASTER_PORT=7077
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=systemhub511,systemhub611,systemhub711 -Dspark.deploy.zookeeper.dir=/spark"
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://systemhub511:9000/directory"
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
[root@systemhub511 spark]# /opt/module/hadoop/sbin/start-dfs.sh
[root@systemhub511 spark]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub611 ~]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub711 ~]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub511 spark]# sbin/start-all.sh
[root@systemhub611 ~]# /opt/module/spark/sbin/start-master.sh
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077,systemhub611:7077

http://systemhub511:8080 | systemhub511节点状态为ALIVE
http://systemhub611:8080 | systemhub611节点状态为STANDBY

enter image description here

[root@systemhub511 spark]# jps.sh
================ root@systemhub511 All Processes ===========
32242 org.apache.hadoop.hdfs.server.namenode.NameNode
11206 org.apache.spark.deploy.master.Master
11368 org.apache.spark.deploy.worker.Worker
9705 org.apache.zookeeper.server.quorum.QuorumPeerMain
32444 org.apache.hadoop.hdfs.server.datanode.DataNode
5228 sun.tools.jps.Jps
================ root@systemhub611 All Processes ===========
9157 org.apache.spark.deploy.master.Master
8901 org.apache.spark.deploy.worker.Worker
2822 sun.tools.jps.Jps
30214 org.apache.hadoop.hdfs.server.datanode.DataNode
7495 org.apache.zookeeper.server.quorum.QuorumPeerMain
================ root@systemhub711 All Processes ===========
5312 org.apache.spark.deploy.worker.Worker
31568 sun.tools.jps.Jps
26869 org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
26647 org.apache.hadoop.hdfs.server.datanode.DataNode
4014 org.apache.zookeeper.server.quorum.QuorumPeerMain
[root@systemhub511 spark]#
[root@systemhub511 spark]# kill -9 11206

enter image description here

💥 1.3.3 Yarn Mode 💥

1.3.3.1 Yarn Mode 概述

enter image description here

1.3.3.2 YarnMode QuickStart
YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
spark.master spark://systemhub511:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://systemhub511:9000/directory
spark.yarn.historyServer.address=systemhub511:18080
spark.history.ui.port=18080
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true-->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar\
100

💥 1.3.4 Mesos Mode 💥

1.3.4.1 Mesos Mode 概述

💥 1.3.5 运行模式对比 💥

模式 集群数量 集群进程 所属者
Loacl Mode 1 Spark
Standalone Mode 3 Master & Worker Spark
Yarn Mode 1 Yarn & HDFS Hadoop

💥 1.3.6 WordCount 实例 💥

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.geekparkhub.core.spark</groupId>
<artifactId>spark_server</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>spark-common</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spark_server</artifactId>
<groupId>com.geekparkhub.core.spark</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-common</artifactId>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.geekparkhub.core.spark.application.wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* WordCountApplication
* <p>
*/
object WordCount {
def main(args: Array[String]): Unit = {
/**
* Create SparkConf
* 创建 SparkConf
*/
val sparkConf = new SparkConf().setMaster(args(0)).setAppName("WordCountApplication")
/**
* Create SparkContext
* 创建 SparkContext
*/
val sc = new SparkContext()
/**
* Read file
* 读取文件
*/
val line: RDD[String] = sc.textFile(args(1))
/**
* To flatten
* 压平
*/
val word: RDD[String] = line.flatMap(_.split(" "))
/**
* Word conversion dual group
* 单词转换二元组
*/
val wordAndOne: RDD[(String, Int)] = word.map((_, 1))
/**
* Count the total number of words
* 统计单词总数
*/
val wordCount: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
/**
* Write out the file
* 写出文件
*/
wordCount.saveAsTextFile(args(2))
/**
* Close resource
* 关闭资源
*/
sc.stop()
}
}
[root@systemhub511 ~]# hadoop fs -mkdir -p /core_flow/spark/input/wordcount
hadoop fs -put /opt/module/spark/input/wordcount/wordcount_001.txt /core_flow/spark/input/wordcount
bin/spark-submit \
--class com.geekparkhub.core.spark.application.wordcount.WordCount \
--master yarn \
./lib_jar/WordCount.jar yarn \
/core_flow/spark/input/wordcount/wordcount_001.txt \
/core_flow/spark/output/wordcount
[root@systemhub511 spark]# hadoop fs -ls -R /core_flow/spark/output/wordcount/
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/_SUCCESS
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/part-00000
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/part-00001
[root@systemhub511 spark]#
[root@systemhub511 spark]# hadoop fs -cat /core_flow/spark/output/wordcount/part-00000
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(scala,1)
(hive,2)
(oozie,1)
(java,1)
[root@systemhub511 spark]#
[root@systemhub511 spark]# hadoop fs -cat /core_flow/spark/output/wordcount/part-00001
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(spark,2)
(hadoop,3)
(flume,1)
(hbase,1)
[root@systemhub511 spark]#

🔥 1.3 Spark Core 🔥

1.3.1 RDD 概述

1.3.1.1 什么是RDD

RDD (Resilient Distributed Dataset)弹性分布式数据集是Spark中最基本数据抽象,代码中是一个抽象类,它代表一个弹性/不可变/可分区/里面的元素可并行计算的集合.

1.3.1.2 RDD 属性
* Internally, each RDD is characterized by five main properties:
*
* - 1. A list of partitions
* - 2. A function for computing each split
* - 3. A list of dependencies on other RDDs
* - 4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - 5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

1.一组分区(Partition),即数据集基本组成单位;
2.一个计算每个分区的函数;
3.RDD之间依赖关系;
4.一个Partitioner,即RDD分片函数;
5.一个列表,存储存取每个Partition的优先位置(preferred location)

1.3.1.3 RDD 特点

RDD表示只读分区数据集,对RDD进行改动,只能通过RDD转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息,RDDs之间存在依赖,RDD执行是按照血缘关系延时计算,如果血缘关系较长,可以通过持久化RDD来切断血缘关系.

1.3.1.3.1 弹性
1.3.1.3.2 分区

RDD逻辑上是分区的,每个分区数据是抽象存在的,计算时会通过一个compute函数得到每个分区数据,如果RDD是通过已有文件系统构建,则compute函数是读取指定文件系统中数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD数据进行转换.

1.3.1.3.3 只读

RDD是只读的,要想改变RDD中数据,只能在现有RDD基础上创建新的RDD.

由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce.

RDD操作算子包括两类,一类是transformations,它是用来将RDD进行转化,构建RDD的血缘关系,另一类是actions,它是用来触发RDD计算得到RDD相关计算结果或者将RDD保存文件系统中.

1.3.1.3.4 依赖

enter image description here

如图所示,RDDs通过操作算子进行转换,转换得到新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖.

依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多关系.

1.3.1.3.5 缓存

enter image description here

如果在应用程序中多次使用同一个RDD时,可以将该RDD缓存起来,该RDD只有在第一次计算时会根据血缘关系得到分区数据,在后续其他地方用到该RDD时,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用.

如图所示,RDD-1经过一系列转换后得到RDD-n并保存到HDFS,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0.

1.3.1.3.6 CheckPoint

虽然RDD血缘关系天然地可以实现容错,当RDD某个分区数据失败或丢失,可以通过血缘关系重建,但是对于长时间迭代型应用来说随着迭代进行,RDDs之间血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能.

为此,RDD支持checkpoint将数据保存到持久化存储中,这样就可以切断之前血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs,它可以从checkpoint处拿到数据.

1.3.2 RDD 编程

1.3.2.1 编程模型

在Spark中,RDD被表示为对象,通过对象方法调用RDD进行转换,经过一系列的transformations定义RDD之后,就可以调用actions触发RDD计算,action可以是向应用程序返回结果(count,collect等),或者是向存储系统保存数据(saveAsTextFile等).
在Spark中,只有遇到action才会执行RDD计算(即延迟计算),这样在运行时可以通过管道方式传输多个转换.
使用Spark开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,Driver中定义了一个或多个RDD.并调用RDD上的action.Worker则执行RDD分区计算任务.

1.3.2.2 RDD 创建
1.3.2.1 集合创建RDD
scala> val rdd = sc.parallelize(Array(511,611,711))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[Int] = Array(511, 611, 711)
scala>
scala> val makerdd = sc.makeRDD(Array(511,611,711))
makerdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> makerdd.collect
res1: Array[Int] = Array(511, 611, 711)
scala>
1.3.2.2 外部存储系统数据集创建RDD
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt")
res2: org.apache.spark.rdd.RDD[String] = /opt/module/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[3] at textFile at <console>:25
scala>
1.3.2.3 从其他创建RDD
1.3.2.3 RDD 转换
1.3.2.3.1 Value 类型
1.3.2.3.1.1 map(func) Method
scala> val rdd = sc.parallelize(Array(511,611,711))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[Int] = Array(511, 611, 711)
scala>
scala> rdd.map((_,1)).collect
res4: Array[(Int, Int)] = Array((511,1), (611,1), (711,1))
scala>
scala> rdd.map((_*2)).collect
res5: Array[Int] = Array(1022, 1222, 1422)
scala>
1.3.2.3.1.2 mapPartitions(func) Method
scala> rdd.mapPartitions(_.map(_*2)).collect
res11: Array[Int] = Array(1022, 1222, 1422)
scala>
1.3.2.3.1.3 mapPartitionsWithIndex(func) Method
scala> rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))).collect
res13: Array[(Int, Int)] = Array((1,511), (2,611), (3,711))
scala>
1.3.2.3.1.4 flatMap(func) Method
scala> val text = sc.textFile("/core_flow/spark/input/wordcount/wordcount_001.txt")
text: org.apache.spark.rdd.RDD[String] = /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[15] at textFile at <console>:24
scala> text.flatMap(_.split(" ")).collect
res16: Array[String] = Array(hadoop, spark, hive, hadoop, spark, hadoop, hbase, flume, hive, scala, java, oozie)
scala>
1.3.2.3.1.5 map()mapPartition()区别
1.3.2.3.1.6 glom Method
scala> rdd.glom.collect
res17: Array[Array[Int]] = Array(Array(), Array(511), Array(611), Array(711))
scala>
1.3.2.3.1.7 groupBy(func) Method
scala> rdd.groupBy(_ % 2).collect
res18: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(611, 711, 511)))
scala>
1.3.2.3.1.8 filter(func) Method
scala> rdd.filter(_%3==0).collect
res20: Array[Int] = Array(711)
scala>
1.3.2.3.1.9 sample(withReplacement,fraction,seed) Method
scala> val rdd = sc.parallelize(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
scala> rdd.sample(false,0.1,3).collect
res22: Array[Int] = Array(1, 33, 37, 50, 59, 69, 75, 78, 85, 98)
scala>
1.3.2.3.1.10 distinct([numTasks])) Method
scala> rdd.distinct(4).collect
res23: Array[Int] = Array(84, 100, 96, 52, 56, 4, 76, 16, 28, 80, 48, 32, 36, 24, 64, 92, 40, 72, 8, 12, 20, 60, 44, 88, 68, 13, 41, 61, 81, 21, 77, 53, 97, 25, 29, 65, 73, 57, 93, 33, 37, 45, 1, 89, 17, 69, 9, 85, 49, 5, 34, 82, 66, 22, 54, 98, 46, 30, 14, 50, 62, 42, 74, 90, 6, 70, 18, 38, 86, 58, 78, 26, 94, 10, 2, 19, 39, 15, 47, 71, 55, 95, 79, 59, 11, 35, 27, 75, 51, 23, 63, 83, 67, 3, 7, 91, 31, 87, 43, 99)
scala>
1.3.2.3.1.11 coalesce(numPartitions) Method
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> rdd.partitions.size
res24: Int = 4
scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[28] at coalesce at <console>:26
scala> coalesceRDD.partitions.size
res25: Int = 3
scala>
1.3.2.3.1.12 repartition(numPartitions) Method
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> rdd.partitions.size
res26: Int = 4
scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at repartition at <console>:26
scala> rerdd.partitions.size
res27: Int = 2
scala>
1.3.2.3.1.13 coalescerepartition区别

1.coalesce重新分区,可以选择是否进行shuffle过程,由参数shuffle: Boolean = false/true决定.

2.repartition实际上是调用coalesce,进行shuffle过程,源码演示:

def repartition(numpartitions: int)(implicit ord: ordering[t] = null): rdd[t] = withscope {
coalesce(numpartitions, shuffle = true)
}
1.3.2.3.1.14 sortBy(func,[ascending],[numTasks]) Method
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> rdd.sortBy(x => x).collect()
res29: Array[Int] = Array(1, 2, 3, 4)
scala> rdd.sortBy(x => x%3).collect()
res30: Array[Int] = Array(3, 1, 4, 2)
scala> rdd.sortBy(x => x,false).collect()
res31: Array[Int] = Array(4, 3, 2, 1)
scala>
1.3.2.3.1.15 pipe(command,[envVars]) Method
[root@systemhub511 ~]# vim /opt/module/spark/input/pipe.sh
[root@systemhub511 ~]# chmod 777 /opt/module/spark/input/pipe.sh
#!/bin/
shecho"Start"
while read LINE;do
echo ">>>" ${LINE}
done
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect
res18: Array[String] = Array(Start, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
scala>
1.3.2.3.2 双Value类型交互
1.3.2.3.2.1 union(otherDataset) Method
scala> var rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> var rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd1.union(rdd2).collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
scala>
1.3.2.3.2.2 subtract(otherDataset) Method
scala> rdd1.subtract(rdd2).collect
res0: Array[Int] = Array(2, 4, 1, 3)
scala>
1.3.2.3.2.3 intersection(otherDataset) Method
scala> rdd1.intersection(rdd2).collect
res1: Array[Int] = Array(5)
scala>
1.3.2.3.2.4 cartesian(otherDataset) Method
scala> rdd1.cartesian(rdd2).collect
res2: Array[(Int, Int)] = Array((1,5), (1,6), (1,7), (2,5), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,5), (3,6), (3,7), (4,5), (4,6), (4,7), (5,5), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))
scala>
1.3.2.3.2.5 zip(otherDataset) Method
scala> rdd1.zip(rdd2).collect
res4: Array[(Int, Int)] = Array((1,6), (2,7), (3,8), (4,9), (5,10))
scala>
1.3.2.3.3 Key-Value 类型
1.3.2.3.3.1 partitionBy Method
scala> val rdd1 = sc.parallelize(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),4)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.mapPartitionsWithIndex((i,t)=>t.map((i,_))).collect
res3: Array[(Int, (Int, String))] = Array((0,(1,A)), (1,(2,B)), (2,(3,C)), (3,(4,D)))
scala> rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
res5: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[3] at partitionBy at <console>:27
scala> res5.partitions.size
res6: Int = 2
scala>
1.3.2.3.3.2 reduceByKey(func,[numTasks]) Method
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd.reduceByKey((x,y)=>x+y).collect
res7: Array[(String, Int)] = Array((female,6), (male,7))
scala>
1.3.2.3.3.3 groupByKey Method
scala> rdd.groupByKey(2).collect
res8: Array[(String, Iterable[Int])] = Array((female,CompactBuffer(5, 1)), (male,CompactBuffer(5, 2)))
scala>
1.3.2.3.3.4 reduceByKeygroupByKey 区别

1.reduceByKey : 按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]

2.groupByKey : 按照key进行分组,直接进行shuffle

3.开发指导 : reduceByKey比groupByKey,建议使用reduceByKey,但是需要注意是否会影响业务逻辑.

1.3.2.3.3.5 aggregateByKey Method

参数 : (zeroValue:U,[partitioner:Partitioner])(seqOp: (U, V) => U,combOp: (U, U) => U)

1.作用 : 在kv对的RDD中,按key将value进行分组合并,合并时将每个value和初始值作为seq函数参数进行计算,返回结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出.

2.参数描述 :
zeroValue : 给每一个分区中的每一个key一个初始值.
seqOp : 函数用于在每一个分区中用初始值逐步迭代value
combOp : 函数用于合并每个分区中的结果

enter image description here

scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd.aggregateByKey(0)(math.max(_,_),_+_).collect
res9: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
scala>
scala> rdd.aggregateByKey(0)(_+_,_+_).collect
res10: Array[(String, Int)] = Array((b,3), (a,5), (c,18))
scala> rdd.reduceByKey(_+_).collect
res11: Array[(String, Int)] = Array((b,3), (a,5), (c,18))
scala>
1.3.2.3.3.6 foldByKey Method
scala> rdd.foldByKey(0)(_+_).collect
res12: Array[(String, Int)] = Array((b,3), (a,5), (c,18))
scala>
1.3.2.3.3.7 combineByKey[C] Method

enter image description here

scala> rdd.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).collect
res15: Array[(String, (Int, Int))] = Array((b,(3,1)), (a,(5,2)), (c,(18,3)))
scala>
1.3.2.3.3.8 sortByKey([ascending],[numTasks]) Method
scala> rdd.sortByKey().collect
res17: Array[(String, Int)] = Array((a,3), (a,2), (b,3), (c,6), (c,8), (c,4))
scala> rdd.sortByKey(false).collect
res19: Array[(String, Int)] = Array((c,4), (c,6), (c,8), (b,3), (a,3), (a,2))
scala>
1.3.2.3.3.9 mapValues Method
scala> rdd.mapValues(_*2).collect
res20: Array[(String, Int)] = Array((a,6), (a,4), (c,8), (b,6), (c,12), (c,16))
scala>
1.3.2.3.3.10 join(otherDataset,[numTasks]) Method
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:24
scala> rdd.join(rdd1).collect
res21: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
scala> rdd.leftOuterJoin(rdd1).collect
res22: Array[(Int, (String, Option[Int]))] = Array((1,(a,Some(4))), (2,(b,Some(5))), (3,(c,Some(6))))
scala> rdd.rightOuterJoin(rdd1).collect
res23: Array[(Int, (Option[String], Int))] = Array((1,(Some(a),4)), (2,(Some(b),5)), (3,(Some(c),6)))
scala>
1.3.2.3.3.11 cogroup(otherDataset,[numTasks]) Method
scala> rdd.cogroup(rdd1).collect
res24: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))
scala>
1.3.2.4 Action
1.3.2.4.1 reduce(func) Method
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> rdd.reduce(_+_)
res25: Int = 55
scala>
1.3.2.4.2 collect() Method
scala> rdd.collect
res26: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala>
1.3.2.4.3 count() Method
scala> rdd.count
res27: Long = 10
scala>
1.3.2.4.4 first() Method
scala> rdd.first
res28: Int = 1
scala>
1.3.2.4.5 take(n) Method
scala> rdd.take(2)
res30: Array[Int] = Array(1, 2)
scala>
1.3.2.4.6 takeOrdered(n) Method
scala> rdd.takeOrdered(3)
res31: Array[Int] = Array(1, 2, 3)
scala>
1.3.2.4.7 aggregate Method

参数 : (zeroValue: U)(seqOp: (U, T) ⇒U, combOp: (U, U) ⇒U)

作用 : aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作,这个函数最终返回的类型不需要和RDD中元素类型一致.

scala> rdd.aggregate(0)(_+_,_+_)
res32: Int = 55
scala>
1.3.2.4.8 fold(num)(func) Method
scala> rdd.fold(0)(_+_)
res34: Int = 55
scala>
1.3.2.4.9 saveAsTextFile(path) Method
1.3.2.4.10 saveAsSequenceFile(path) Method
1.3.2.4.11 saveAsObjectFile(path) Method
1.3.2.4.12 countByKey() Method
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[35] at parallelize at <console>:24
scala> rdd.countByKey
res35: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
scala>
1.3.2.4.13 foreach(func) Method
scala> rdd.foreach(print)
1.3.2.5 RDD 函数传递

在实际开发中往往需要开发者定义一些对于RDD操作,那么此时需要主要的是,初始化工作是在Driver端进行,而实际运行程序是在Executor端进行,这就涉及到了跨进程通信,跨进程通信是需要序列化操作.

1.3.2.5.1 传递方法

在这个方法中所调用的方法isMatch()是定义在Search这个类中,实际上调用的是this.isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端.

1.3.2.5.2 传递属性

在这个方法中所调用的方法query是定义在Search这个类中的字段,实际上调用的是this.query,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端.

package com.geekparkhub.core.spark.application.methods
import org.apache.spark.rdd.RDD
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* Search
* <p>
*/
class Search(query: String) extends Serializable {
// 过滤出包含字符串数据
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 过滤出包含字符串RDD
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 过滤出包含字符串RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
}
package com.geekparkhub.core.spark.application.methods
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* TransFormAction
* <p>
*/
object TransFormAction {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TransFormAction")
// 创建SC
val sc = new SparkContext(sparkConf)
// 创建RDD
val word: RDD[String] = sc.parallelize(Array("abc", "dcd"))
// 创建Search对象
val search = new Search("a")
// 调用方法
val searched: RDD[String] = search.getMatch1(word)
// 循环输出
searched.collect().foreach(println)
// 关闭资源
sc.stop()
}
}
1.3.2.6 RDD 依赖关系
1.3.2.6.1 Lineage

enter image description here

RDD只支持粗粒度转换,即在大量记录上执行的单个操作,将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区,RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区.

scala> sc.textFile("/core_flow/spark/input/wordcount/wordcount_001.txt")
res0: org.apache.spark.rdd.RDD[String] = /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[1] at textFile at <console>:25
scala> res0.flatMap(_.split(" "))
res2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:27
scala> res2.map((_,1))
res3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:29
scala> res3.reduceByKey(_+_)
res4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:31
scala>
scala> res0.toDebugString
res5: String =
(2) /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[1] at textFile at <console>:25 []
| /core_flow/spark/input/wordcount/wordcount_001.txt HadoopRDD[0] at textFile at <console>:25 []
scala>
scala> res2.toDebugString
res6: String =
(2) MapPartitionsRDD[2] at flatMap at <console>:27 []
| /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[1] at textFile at <console>:25 []
| /core_flow/spark/input/wordcount/wordcount_001.txt HadoopRDD[0] at textFile at <console>:25 []
scala>
scala> res3.toDebugString
res7: String =
(2) MapPartitionsRDD[3] at map at <console>:29 []
| MapPartitionsRDD[2] at flatMap at <console>:27 []
| /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[1] at textFile at <console>:25 []
| /core_flow/spark/input/wordcount/wordcount_001.txt HadoopRDD[0] at textFile at <console>:25 []
scala>
scala> res4.toDebugString
res8: String =
(2) ShuffledRDD[4] at reduceByKey at <console>:31 []
+-(2) MapPartitionsRDD[3] at map at <console>:29 []
| MapPartitionsRDD[2] at flatMap at <console>:27 []
| /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[1] at textFile at <console>:25 []
| /core_flow/spark/input/wordcount/wordcount_001.txt HadoopRDD[0] at textFile at <console>:25 []
scala>
1.3.2.6.2 窄依赖

enter image description here

1.3.2.6.3 宽依赖

enter image description here

1.3.2.6.4 DAG

enter image description here

1.3.2.6.5 任务划分(重点)
1.3.2.6.7 RDD缓存
1.3.2.6.8 RDDCheckPoint
scala> sc.setCheckpointDir("hdfs://systemhub511:9000/core_flow/spark/checkpoint")
scala> val rdd = sc.parallelize(Array("systemhub511"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val check = rdd.map(_+System.currentTimeMillis)
check: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:26
scala>
scala> check.collect
res10: Array[String] = Array(systemhub5111559138263898)
scala> check.collect
res11: Array[String] = Array(systemhub5111559138266443)
scala> check.collect
res12: Array[String] = Array(systemhub5111559138267862)
scala>

1.3.3 Key-Value RDD 数据分区

1.3.3.1 获取RDD 分区
scala> rdd.partitioner
res14: Option[org.apache.spark.Partitioner] = None
1.3.3.2 Hash 分区
scala> val nopar = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
nopar: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:25
scala> nopar.mapPartitionsWithIndex((index,iter)=>{Iterator(index.toString+":"+iter.mkString("|"))}).collect
res15: Array[String] = Array(0:, 1:(1,3), 2:(1,2), 3:(2,4), 4:, 5:(2,3), 6:(3,6), 7:(3,8))
scala> val hashpar = nopar.partitionBy(new org.apache.spark.HashPartitioner(7))
hashpar: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[9] at partitionBy at <console>:27
scala> hashpar.count
res20: Long = 6
scala> hashpar.partitioner
res21: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@7)
scala> hashpar.mapPartitions(iter => Iterator(iter.length)).collect()
res22: Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
scala>
1.3.3.3 Ranger 分区
1.3.3.4 自定义 分区
package com.geekparkhub.core.spark.application.partitioner
import org.apache.spark.Partitioner
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* CustomerPartitioner
* <p>
*/
class CustomerPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
0
}
}
package com.geekparkhub.core.spark.application.partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* PartitionerAction
* <p>
*/
object PartitionerAction {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TransFormAction")
// 创建SC
val sc = new SparkContext(sparkConf)
// 创建RDD
val word: RDD[String] = sc.parallelize(Array("abc", "dcd"))
// 将元素转换为元祖
val wordAndOne: RDD[(String, Int)] = word.map((_, 1))
// 自定义分区
val partitioned: RDD[(String, Int)] = wordAndOne.partitionBy(new CustomerPartitioner(2))
// 查看分区后分区结果
val indexAndData: RDD[(Int, (String, Int))] = partitioned.mapPartitionsWithIndex((i,t)=>t.map((i,_)))
// 打印数据
indexAndData.collect().foreach(println)
// 关闭资源
sc.stop()
}
}
(0,(abc,1))
(0,(dcd,1))

1.3.4 数据读取&保存

1.3.4.1 文件类数据读取&保存
1.3.4.1 Text File
scala> sc.textFile("hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt")
res23: org.apache.spark.rdd.RDD[String] = hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[12] at textFile at <console>:26
scala> res23.toDebugString
res25: String =
(2) hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[12] at textFile at <console>:26 []
| hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt HadoopRDD[11] at textFile at <console>:26 []
scala>
scala> hdfsFile.saveAsTextFile("/core_flow/spark/output/wordcount/")
1.3.4.2 Json File
scala> import scala.util.parsing.json.JSON
import scala.util.parsing.json.JSON
scala>
[root@systemhub511 ~]# hadoop fs -mkdir -p /core_flow/spark/json/001
[root@systemhub511 ~]# hadoop fs -put /opt/module/spark/examples/src/main/resources/people.json /core_flow/spark/json/001/
scala> val json = sc.textFile("hdfs://systemhub511:9000/core_flow/spark/json/001/people.json")
json: org.apache.spark.rdd.RDD[String] = hdfs://systemhub511:9000/core_flow/spark/json/001/people.json MapPartitionsRDD[14] at textFile at <console>:26
scala>
scala> val result = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[15] at map at <console>:28
scala>
scala> result.collect
res26: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))
scala>
1.3.4.3 Sequence File
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[16] at parallelize at <console>:26
scala>
scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
[root@systemhub511 ~]# cd /opt/module/spark/seqFile/
[root@systemhub511 seqFile]# ll -a
总用量 28
drwxr-xr-x. 2 root root 4096 5月 29 23:57 .
drwxr-xr-x. 21 geekdeveloper geekdeveloper 4096 5月 30 00:05 ..
-rw-r--r--. 1 root root 92 5月 29 23:57 part-00000
-rw-r--r--. 1 root root 12 5月 29 23:57 .part-00000.crc
-rw-r--r--. 1 root root 108 5月 29 23:57 part-00003
-rw-r--r--. 1 root root 12 5月 29 23:57 .part-00003.crc
-rw-r--r--. 1 root root 0 5月 29 23:57 _SUCCESS
-rw-r--r--. 1 root root 8 5月 29 23:57 ._SUCCESS.crc
[root@systemhub511 seqFile]# cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritabler[-o���]h�~u���
[root@systemhub511 seqFile]#
scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[19] at sequenceFile at <console>:26
scala>
scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
1.3.4.4 ObjectFile
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:26
scala>
scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
[root@systemhub511 ~]# cd /opt/module/spark/objectFile/
[root@systemhub511 objectFile]# ll
总用量 8
-rw-r--r--. 1 root root 138 5月 30 00:05 part-00000
-rw-r--r--. 1 root root 138 5月 30 00:05 part-00003
-rw-r--r--. 1 root root 0 5月 30 00:05 _SUCCESS
[root@systemhub511 objectFile]# cat part-00000
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable� �L�h�l:T���#��ur[IM�`&v겥xp
[root@systemhub511 objectFile]#
scala> val objFile = sc.objectFile[(Int)]("file:///opt/module/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at objectFile at <console>:26
scala>
objFile.collect
res19: Array[Int] = Array(1, 2, 3, 4)
1.3.4.2 文件系统数据读取&保存
1.3.4.1 HDFS

Spark整个生态系统与Hadoop是完全兼容,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.
另外由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有版本,也提供了两套创建操作接口.
对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口,主要包含以下四个参数 :

1.输入格式(InputFormat) : 制定数据输入类型,如TextInputFormat等,新旧两个版本所引用版本分别是org.apache.hadoop.mapred.InputFormatorg.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
2.键类型 : 指定[K,V]键值对中K类型
3.值类型: 指定[K,V]键值对中V类型
4.分区值 : 指定由外部存储生成RDD的partition数量最小值,如果没有指定系统会使用默认值defaultMinSplits.

其他创建操作API接口都是为了方便最终Spark程序开发者而设置的,是这两个接口高效实现版本,例如对于textFile而言,只有path这个指定文件路径参数,其他参数在系统内部指定了默认值.
1.在Hadoop中以压缩形式存储数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件后缀推断解压算法进行解压.
2.如果用Spark从Hadoop中读取某种类型数据不知道怎么读取的时候,上网查找一个使用map-reduce时候是怎么读取这种这种数据,然后再将对应的读取方式改写成上面的hadoopRDD和newAPIHadoopRDD两个类即可.

1.3.4.2 MySQL数据库 连接
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
</dependencies>
package com.geekparkhub.core.spark.application.dataconnections
import java.sql.DriverManager
import org.apache.spark.deploy.worker.DriverWrapper
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* JDBCConnection
* <p>
*/
object JDBCConnection {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JDBCConnection")
// 创建SC
val sc = new SparkContext(sparkConf)
// 定义JDBC连接属性信息
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://systemhub711:3306/company"
val userName = "root"
val passWd = "ax04854"
// 创建JDBC RDD
val JdbcRDD = new JdbcRDD[(Int, String)](sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
}, "select id,name from staff where ? <= id and id <= ?",
1,
10,
1,
x => {
(x.getInt(1), x.getString(2))
}
)
// 打印JdbcRDD结果
JdbcRDD.collect().foreach(println)
// 关闭资源
sc.stop()
}
}
package com.geekparkhub.core.spark.application.dataconnections
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* JBDCinsertData
* <p>
*/
object JBDCinsertData {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JBDCRead")
// 创建SC
val sc = new SparkContext(sparkConf)
// 创建数据
val data = sc.parallelize(List("Female", "Male", "Female"))
// 调用添加数据方法
data.foreachPartition(insertData)
}
// 添加数据方法
def insertData(iterator: Iterator[String]): Unit = {
Class.forName("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://systemhub711:3306/company", "root", "000000")
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into staff(name) values(?)")
ps.setString(1, data)
ps.executeUpdate()
})
}
}
1.3.4.3 HBase 数据库
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
package com.geekparkhub.core.spark.application.dataconnections
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* HbaseConnection
* <p>
*/
object HbaseConnection {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HbaseConnection")
// 创建SC
val sc = new SparkContext(sparkConf)
//构建HBase配置信息
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "systemhub511,systemhub611,systemhub711")
conf.set(TableInputFormat.INPUT_TABLE, "test")
// 读取HBASE数据
val hbaseRDD = new NewHadoopRDD(sc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result], conf)
// 获取RowKey
val value: RDD[String] = hbaseRDD.map(x => Bytes.toString(x._2.getRow))
// 输出数据
value.collect().foreach(println)
// 关闭资源
sc.stop()
}
}
package com.geekparkhub.core.spark.application.dataconnections
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* HbaseWrite
* <p>
*/
object HbaseWrite {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HbaseWrite")
// 创建SC
val sc = new SparkContext(sparkConf)
// 创建RDD
val initialRDD: RDD[(Int, String, Int)] = sc.parallelize(List((1, "apple", 11), (2, "banana", 12), (3, "pear", 13)))
// 创建JobConf
val conf = new JobConf()
conf.set("hbase.zookeeper.quorum", "systemhub511,systemhub611,systemhub711")
conf.setOutputFormat(classOf[TableOutputFormat[ImmutableBytesWritable]])
conf.set(TableOutputFormat.OUTPUT_TABLE, "test")
// 定义 Hbase 添加数据方法
def convert(triple: (Int, String, Int)): (ImmutableBytesWritable, Put) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))(new ImmutableBytesWritable, put)
}
// 转换RDD
val writRDD: RDD[(ImmutableBytesWritable, Put)] = initialRDD.map(convert)
// 写入HBASE
writRDD.saveAsHadoopDataset(conf)
// 关闭资源
sc.stop()
}
}

1.3.5 RDD 编程进阶

1.3.5.1 累加器

累加器用来对信息进行聚合,通常在向Spark传递函数时,比如使用map()函数或者用filter()传条件时,可以使用驱动器程序中定义变量,但是集群中运行每个任务都会得到这些变量的一份新副本,更新这些副本的值也不会影响驱动器中的对应变量,如果想实现所有分片处理时更新共享变量的功能,那么累加器可以实现想要的效果.

1.3.5.1.1 系统累加器

通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器,返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型,Spark闭包里的执行器代码可以使用累加器的+=方法(在Java中是add)增加累加器的值,驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值.

工作节点上任务不能访问累加器值,从这些任务的角度来看,累加器是一个只写变量.

对于要在行动操作中使用累加器,Spark只会把每个任务对各累加器的修改应用一次,因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,必须把它放在foreach()这样的行动操作中,转化操作中累加器可能会发生不止一次更新.

package com.geekparkhub.core.spark.application.methods
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* AccuAction
* <p>
*/
object AccuAction {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("AccuAction")
// 创建SC
val sc = new SparkContext(sparkConf)
// 累加器
val sum: LongAccumulator = sc.longAccumulator("sum")
// 创建RDD
val value: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4))
val word: RDD[(Int, Int)] = value.map(x => {
// 添加累加
sum.add(x)
(x, 1)
})
word.collect().foreach(println)
println(sum.value)
// 关闭资源
sc.stop()
}
}
1.3.5.1.2 自定义累加器

自定义累加器类型功能在1.X版本中就已经提供,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大改进,而且官方还提供了一个新抽象类 : AccumulatorV2来提供更加友好自定义类型累加器的实现方式,实现自定义类型累加器需要继承AccumulatorV2并至少覆写下例中出现的方法,

package com.geekparkhub.core.spark.application.methods
import org.apache.spark.util.AccumulatorV2
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* AccumulatorAction
* <p>
*/
class AccumulatorAction extends AccumulatorV2[Int,Int]{
var sum = 0
// 判断是否为空
override def isZero: Boolean = sum == 0
// 复制方法
override def copy(): AccumulatorV2[Int, Int] = {
val accumulatorAction = new AccumulatorAction
accumulatorAction.sum = this.sum
accumulatorAction
}
// 重置方法
override def reset(): Unit = 0
// 累加方法
override def add(v: Int): Unit = sum += v
// 合并方法
override def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.value
// 返回值
override def value: Int = sum
}
1.3.5.2 广播变量 (调优策略)

广播变量用来高效分发较大对象,向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用.

比如,如果应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手,在多个并行操作中使用同一个变量,但是Spark会为每个任务分别发送.

使用广播变量过程 :
1.通过对一个类型T的对象调用SparkContext.broadcast创建出Broadcast[T]对象,任何可序列化类型都可以这么实现.
2.通过value属性访问该对象值(在Java中为value()方法).
3.变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点).

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
scala>

🔥 1.4 Spark SQL 🔥

1.4.1 Spark SQL 概述

1.4.1.1 什么是 Spark SQL

Spark SQL是Spark用来处理结构化数据模块,它提供了2个编程抽象 : DataFrameDataSet,并且作为分布式SQL查询引擎作用.

已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc程序复杂性,由于MapReduce计算模型执行效率比较慢,所以Spark SQL应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快.

1.4.1.2 Spark SQL 特点
1.4.1.3 什么是 DataFrame

与RDD类似,DataFrame也是一个分布式数据容器,然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema,同时与Hive类似,DataFrame也支持嵌套数据类型(struct / array / map).

从API易用性角度上看,DataFrame API提供是一套高层的关系操作,比函数式RDD API要更加友好,门槛更低.

enter image description here

上图直观地体现了DataFrame和RDD区别,左侧RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类内部结构,而右侧DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列名称和类型各是什么,DataFrame是为数据提供了Schema视图,可以把它当做数据库中一张数据表.

DataFrame也是懒执行,性能上比RDD要高要原因 : 优化执行计划,查询计划通过Spark catalyst optimiser进行优化.

enter image description here

为了说明查询优化,上图展示的人口数据分析示例,图中构造了两个DataFrame,将它们join之后又做了一次filter操作,如果原封不动地执行这个执行计划,最终的执行效率是不高的,因为join是一个代价较大操作,也可能会产生一个较大数据集,如果能将filter下推到join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间.
而Spark SQL的查询优化器正是这样做的,简而言之逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程.

1.4.1.4 什么是 DataSet

1.DataSet是DataframeAPI扩展,是SparkSQL最新数据抽象.

2.友好API风格,既具有类型安全检查也具有Dataframe的查询优化特性.

3.Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率.

4.样例类被用来在Dataset中定义数据结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称.

5.Dataframe是Dataset的特列,DataFrame=Dataset[Row],所以可以通过as方法将Dataframe转换为Dataset,Row是一个类型,跟Car / Person这些类型一样,所有表结构信息都用Row来表示.

6.DataSet是强类型,比如可以有Dataset[Car],Dataset[Person].

7.DataFrame只是知道字段,但是不知道字段类型,所以在执行这些操作时是没办法在编译的时候检查是否类型失败,比如可以对一个String进行减法操作,在执行时才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查,就跟JSON对象和类对象之间的类比.

1.4.2 Spark SQL 编程

1.4.2.1 SparkSession 新起始点

在老版本中,SparkSQL提供两种SQL查询起始点 :
SQLContext : 用于Spark提供SQL查询.
HiveContext : 用于连接Hive查询.

SparkSession是Spark最新SQL查询起始点,实质上是SQLContext和HiveContext组合,所以在SQLContext和HiveContext上可用API在SparkSession上同样是可以使用,SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成.

1.4.2.2 DataFrame
1.4.2.2.1 创建

在SparkSQL中SparkSession是创建DataFrame和执行SQL入口.
创建DataFrame有三种方式 :
1.通过Spark数据源进行创建.
2.从已存在的RDD进行转换.
3.从Hive Table进行查询返回.

scala> spark.read.
csv jdbc load options parquet table textFile
format json option orc schema text
scala> spark.read.
scala> val jsonflow = spark.read.json("hdfs://systemhub511:9000/core_flow/spark/json/001/people.json")
jsonflow: org.apache.spark.sql.DataFrame = [age: bigint, name: string]=
scala> jsonflow.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala>
1.4.2.2.2 SQL风格语法(主要)
scala> jsonflow.createTempView("people")
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala>
scala> jsonflow.createGlobalTempView("peoples")
scala> spark.sql("SELECT * FROM global_temp.peoples").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> spark.newSession().sql("SELECT * FROM global_temp.peoples").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala>
1.4.2.2.3 DSL风格语法(次要)
scala> jsonflow.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala>
scala> jsonflow.select("name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala>
scala> jsonflow.select($"name",$"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala>
scala> jsonflow.filter($"age" > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala>
scala> jsonflow.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
scala>
1.4.2.2.4 RDD转换为DateFrame

如果需要RDD与DF或者DS之间操作,需要引入import spark.implicits._
spark并不是包名,而是sparkSession对象名称.

scala> import spark.implicits._
import spark.implicits._
scala> val peopleRDD = sc.textFile("hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[30] at textFile at <console>:27
scala>
scala> peopleRDD.map{x=>{val split = x.split(",");(split(0),split(1).trim)}}.toDF("name","age")
res11: org.apache.spark.sql.DataFrame = [name: string, age: string]
scala>
scala> case class People(name:String, age:Int)
defined class People
scala> peopleRDD.map{x=>{val split = x.split(",");People(split(0),split(1).trim.toInt)}}.toDF
res17: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> res17.toDF
res18: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala>
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala>
scala> val structType: StructType = StructType(StructField("name",StringType) :: StructField("age",IntegerType) :: Nil)
structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
scala>
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala>
scala> val data = peopleRDD.map{x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
scala>
scala> val dataFrame = spark.createDataFrame(data, structType)
package com.geekparkhub.core.spark.application.sparksql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* SqlAction
* <p>
*/
object SqlAction {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val sparkSession: SparkSession = SparkSession
.builder().master("local[*]").appName("SqlAction").getOrCreate()
// 创建SC
val sc: SparkContext = sparkSession.sparkContext
// 创建 RDD
val rdd: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
// 将Int类型RDD转换为Row类型RDD
val rowRDD: RDD[Row] = rdd.map(x => {Row(x)})
// 数据输出
rowRDD.collect().foreach(println)
// 创建元数据信息
val structType = new StructType
val structTypes: StructType = structType.add(StructField("id", IntegerType))
val dataFrame: DataFrame = sparkSession.createDataFrame(rowRDD,structTypes)
// 导入隐式转换
import sparkSession.implicits._
// DSL风格 数据查询
dataFrame.select("id").show()
// 关闭资源
sparkSession.stop()
}
}
1.4.2.2.5 DateFrame转换为RDD
scala> val df = spark.read.json("/core_flow/spark/json/001/people.json")df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala>
scala> val dfToRDD = df.rdd
dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:29
scala>
scala> dfToRDD.collect
res0: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
scala>
1.4.2.3 DataSet
1.4.2.3.1 创建
scala> case class Person(name: String, age: Long)
defined class Person
scala>
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala>
scala> caseClassDS.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
scala>
1.4.2.3.2 RDD转换为DataSet
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[8] at textFile at <console>:28
scala>
scala> case class Person(name: String, age: Long)
defined class Person
scala>
scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS
res2: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala>
1.4.2.3.3 DataSet转换为RDD
scala> val DS= Seq(Person("Andy", 32)).toDS()
DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala>
scala> DS.rdd
res3: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[12] at rdd at <console>:28
scala> res3.collect
res4: Array[Person] = Array(Person(Andy,32))
scala>
1.4.2.4 DataFrame与DataSet相互操作
1.4.2.4.1 DataFrame转Dataset
scala> val df = spark.read.json("./examples/src/main/resources/people.json")
scala> case class Person(name: String, age: Long)
defined class Person
scala>
scala> df.as[Person]
res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala>
1.4.2.4.2 Dataset转DataFrame
scala> case class Person(name: String, age: Long)
defined class Person
scala>
scala> val ds = Seq(Person("Andy", 32)).toDS()
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala>
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> df.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
scala>
1.4.2.5 RDD / DataFrame / DataSet

enter image description here

在SparkSQL中Spark为提供了两个新抽象,分别是DataFrameDataSet.
他们和RDD有什么区别? 首先从版本的产生上来看 :

RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)

如果同样数据都给到这三个数据结构,他们分别计算之后,都会给出相同结果,不同是执行效率和执行方式.
在后期Spark版本中,DataSet会逐步取代RDD和DataFrame成为唯一的API接口.

1.4.2.5.1 三者共性
DF.map{
caseRow(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case_=> ""
}
// 定义字段名和类型
caseclassColtest(col1:String,col2:Int)extendsSerializable
DS.map{
caseColtest(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case_=> ""
}
1.4.2.5.2 三者区别
DF.foreach{
line=>
valcol1=line.getAs[String]("col1")
valcol2=line.getAs[String]("col2")
}
1.4.2.6 SparkSQL Application
package com.geekparkhub.core.spark.application.sparksql
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* SqlAction
* <p>
*/
object SqlAction {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val sparkSession: SparkSession = SparkSession
.builder().master("local[*]").appName("SqlAction").getOrCreate()
// 导入隐式转换
import sparkSession.implicits._
// 创建DF
val df: DataFrame = sparkSession.read.json("/Volumes/GEEK-SYSTEM/Technical_Framework/spark/projects/spark_server/spark-sql/data/people.json")
// SQL风格 数据查询 | 创建临时表
df.createTempView("PEOPLE")
sparkSession.sql("SELECT * FROM PEOPLE").show()
// DSL风格 数据查询
df.select("name").show()
// 关闭资源
sparkSession.stop()
}
}
1.4.2.7 自定义函数
1.4.2.7.1 自定义UDF函数
scala> val df = spark.read.json("hdfs://systemhub511:9000/core_flow/spark/json/001/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala>
scala> spark.udf.register("addName",(x:String) => "Name:" + x)
res1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala>
scala> df.createOrReplaceTempView("people")
scala> spark.sql("Select addName(name),age from people").show()
+-----------------+----+
|UDF:addName(name)| age|
+-----------------+----+
| Name:Michael|null|
| Name:Andy| 30|
| Name:Justin| 19|
+-----------------+----+
scala>
1.4.2.7.2 自定义UDAF聚合函数
package com.geekparkhub.core.spark.application.aggregation
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* AvgAction
* <p>
*/
object AvgAction extends UserDefinedAggregateFunction {
// 定义输入数据类型
override def inputSchema: StructType = StructType(StructField("input", LongType) :: Nil)
// 缓存中间值类型
override def bufferSchema: StructType = StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
// 定义输出数据类型
override def dataType: DataType = DoubleType
// 函数稳定参数
override def deterministic: Boolean = true
// 初始化缓存数据
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// 在执行器之内更新
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1L
}
// 在执行器之外合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 执行数据计算
override def evaluate(buffer: Row): Any = buffer.getLong(0).toDouble / buffer.getLong(1)
}
package com.geekparkhub.core.spark.application.aggregation
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* UdafAction
* <p>
*/
object UdafAction {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val sparkSession: SparkSession = SparkSession
.builder().master("local[*]").appName("UdafAction").getOrCreate()
// 创建DF
val df: DataFrame = sparkSession.read.json("/Volumes/GEEK-SYSTEM/Technical_Framework/spark/projects/spark_server/spark-sql/data/people.json")
// SQL风格 数据查询 | 创建临时表
df.createTempView("PEOPLE")
// 注册自定义函数
sparkSession.udf.register("AvgAction", AvgAction)
// 使用自定义函数
sparkSession.sql("SELECT AvgAction(age) FROM PEOPLE").show()
// 关闭资源
sparkSession.stop()
}
}

1.4.3 Spark SQL 数据源

1.4.3.1 通用加载 / 保存方法
1.4.3.1.1 手动指定选项
scala> val df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name","favorite_color").write.save("namesAndFavColors.parquet")
scala>
scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
scala> peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")
scala>
scala> val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://systemhub511:9000/namesAndAges.parquet`")
scala> sqlDF.show()
scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")
scala> peopleDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://systemhub511:9000/namesAndAges.parquet`")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
1.4.3.1.2 文件保存选项
Scala / Java Any Language Meaning
SaveMode.ErrorIfExists(default) “error”(default) 如果文件存在,则报错
SaveMode.Append “append” 追加
SaveMode.Overwrite “overwrite” 覆写
SaveMode.Ignore “ignore” 数据存在,则忽略
/**
* Specifies the behavior when data or table already exists. Options include:
* - `overwrite`: overwrite the existing data.
* - `append`: append the data.
* - `ignore`: ignore the operation (i.e. no-op).
* - `error`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: String): DataFrameWriter[T] = {
this.mode = saveMode.toLowerCase match {
case "overwrite" => SaveMode.Overwrite
case "append" => SaveMode.Append
case "ignore" => SaveMode.Ignore
case "error" | "default" => SaveMode.ErrorIfExists
case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.")
}
this
}
1.4.3.2 JSON文件
scala> import spark.implicits._
scala> val path = "examples/src/main/resources/people.json"
scala> val peopleDF = spark.read.json(path)
scala> peopleDF.createOrReplaceTempView("people")
scala> val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
scala> teenagerNamesDF.show()
+------+
| name|
+------+
|Justin|
+------+
scala> val otherPeopleDataset = spark.createDataset("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
scala> val otherPeople = spark.read.json(otherPeopleDataset)
scala> otherPeople.show()
+---------------+----+
| address | name |
+---------------+----+
|[Columbus,Ohio]| Yin|
1.4.3.3 Parquet文件
scala> importing spark.implicits._
scala> import spark.implicits._
scala> val peopleDF = spark.read.json("examples/src/main/resources/people.json")
scala> peopleDF.write.parquet("hdfs://systemhub511:9000/people.parquet")
scala> val parquetFileDF = spark.read.parquet("hdfs://systemhub511:9000/people.parquet")
scala> parquetFileDF.createOrReplaceTempView("parquetFile")
scala> val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
scala> namesDF.map(attributes => "Name: " + attributes(0)).show()
+------------+
| value|
+------------+
|Name: Justin|
+------------+
1.4.3.4 JDBC
[root@systemhub711 ~]# cp /opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar /opt/module/spark/jars/
scala> val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://systemhub711:3306/company").option("dbtable","staff").option("user","root").option("password","ax01465").load()
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
scala> jdbcDF.show
+---+-------+------+
| id| name| sex|
+---+-------+------+
| 1|test001| male|
| 2|test002|female|
| 3|test003|female|
| 4|test004| male|
| 5|test005|female|
| 6|test006| male|
| 7|test007|female|
| 8|test008|female|
| 9|test009|female|
| 10|test010|female|
| 11|test011|female|
| 12|test012| male|
| 13| Female| null|
| 14| Male| null|
| 15| Female| null|
+---+-------+------+
scala>
scala> jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://systemhub711:3306/company").option("dbtable","rddtable2").option("user","root").option("password","ax01465").save()
1.4.3.5 Hive DataBase

Apache Hive是Hadoop上SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含,包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及Hive查询语言(HiveQL/HQL)等,需要强调的一点是,如果要在Spark SQL中包含Hive库,并不需要事先安装Hive,一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了,如果下载的是二进制版本Spark,它应该已经在编译时添加了Hive支持.

若要把Spark SQL连接到一个部署好Hive上,你必须把hive-site.xml复制到Spark配置文件目录中($SPARK_HOME/conf),即使没有部署好Hive,Spark SQL也可以运行,需要注意的是,如果没有部署好Hive,Spark SQL会在当前工作目录中创建出Hive元数据仓库,叫作metastore_db,此外如果尝试使用HiveQL中的CREATE TABLE (并非CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在默认的文件系统中的/user/hive/warehouse目录中(如果classpath中有配好的hdfs-site.xml,默认文件系统就是HDFS,否则就是本地文件系统).

1.4.3.5.1 内嵌Hive应用
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
scala> spark.sql("create table hivetest(id int)")
19/06/03 02:03:17 WARN metastore.HiveMetaStore: Location: file:/opt/module/spark/spark-warehouse/hivetest specified for non-external table:hivetest
res5: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| hivetest| false|
+--------+---------+-----------+
scala> spark.sql("select * from hivetest").show()
+---+
| id|
+---+
+---+
scala>
1.4.3.5.2 外部Hive应用
[root@systemhub711 spark]# /opt/module/hive/bin/hive
[root@systemhub711 spark]# cp /opt/module/hive/conf/hive-site.xml ./conf/
scala> spark.sql("show tables").show
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| business| false|
| default| dept| false|
| default| dept_partition| false|
| default| emp| false|
| default| emp_sex| false|
| default|hive_hbase_emp_table| false|
| default| hive_workflow| false|
| default| location| false|
| default| movie_info| false|
| default|multitasking_hive...| false|
| default| person_info| false|
| default| relevance_hbase_emp| false|
| default| score| false|
| default| staff_hive| false|
| default| test| false|
| default| test001| false|
| default| test002| false|
| default| test003| false|
| default| test004| false|
| default| test005| false|
+--------+--------------------+-----------+
only showing top 20 rows
scala> spark.sql("select * from emp").show
+-----+-----+---------+----+----------+--------+-----+------+
|empno|ename| job| mgr| hiredate| sal| comm|deptno|
+-----+-----+---------+----+----------+--------+-----+------+
| 7369|SMITH|CLERKSKLD|7902|1980-12-17| 800.0| 20.0| null|
| 7499|ALLTE|SALESMANS|7689|1987-02-23| 1600.0|300.0| 30|
| 7521|WAROS|SJDHHJDJX|7869|1984-06-12| 1250.18|500.0| 30|
| 7566|JOSSS|JDHYHDSDS|4545|1874-05-15| 2894.25| 20.0| null|
| 7654|SOCTD|MANSJUSSD|4855|1996-02-14| 2852.3| 30.0| null|
| 7698|ADAMS|JUSHHWESD|4552|1985-05-16|25524.02| 30.0| null|
| 7782|JAMSK|KIHNGSEHN|7769|1991-06-23| 1100.0| 20.0| null|
| 7788|FOESS|CLAEDFDFD|7698|1994-09-17| 950.0| 30.0| null|
| 7939|KINGS|CLADDJHEW|7566|1993-07-12| 3000.0| 20.0| null|
+-----+-----+---------+----+----------+--------+-----+------+
scala>
1.4.3.5.3 运行Spark SQL CLI
[root@systemhub711 spark]# bin/spark-sql
spark-sql (default)> show tables;
database tableName isTemporary
default business false
default dept false
default dept_partition false
default emp false
default emp_sex false
default hive_hbase_emp_table false
default hive_workflow false
default location false
default movie_info false
default multitasking_hive_workflow false
default person_info false
default relevance_hbase_emp false
default score false
default staff_hive false
default test false
default test001 false
default test002 false
default test003 false
default test004 false
default test005 false
default test006 false
default test007 false
default test008 false
default test_buck false
default test_bucket false
Time taken: 6.2 seconds, Fetched 25 row(s)
19/06/03 02:17:58 INFO CliDriver: Time taken: 6.2 seconds, Fetched 25 row(s)
spark-sql (default)>
1.4.3.5.4 使用IDEA连接SparkSQL for Hive
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.geekparkhub.core.spark</groupId>
<artifactId>spark_server</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>spark-common</module>
<module>spark-core</module>
<module>spark-sql</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
</project>
package com.geekparkhub.core.spark.application.sparksql
import org.apache.spark.sql.SparkSession
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* SparkHiveAction
* <p>
*/
object SparkHiveAction {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val sparkSession: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.master("local[*]")
.appName("SparkHiveAction")
.getOrCreate()
// 展示数据表信息
sparkSession.sql("show tables").show()
// 关闭资源
sparkSession.stop()
}
}
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| business| false|
| default| dept| false|
| default| dept_partition| false|
| default| emp| false|
| default| emp_sex| false|
| default|hive_hbase_emp_table| false|
| default| hive_workflow| false|
| default| location| false|
| default| movie_info| false|
| default|multitasking_hive...| false|
| default| person_info| false|
| default| relevance_hbase_emp| false|
| default| score| false|
| default| staff_hive| false|
| default| test| false|
| default| test001| false|
| default| test002| false|
| default| test003| false|
| default| test004| false|
| default| test005| false|
+--------+--------------------+-----------+
only showing top 20 rows

1.4.4 Spark SQL 实例

🔥 1.5 Spark Streaming 🔥

1.5.1 Spark Streaming 概述

1.5.1.1 Spark Streaming 是什么

Spark Streaming用于流式数据处理,Spark Streaming支持数据输入源很多,例如 : Kafka / Flume / Twitter / ZeroMQ和简单TCP套接字等等,数据输入后可以用Spark高度抽象原语如 : map / reduce / join / window等进行运算,而结果也能保存在很多地方,如HDFS,数据库等,另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合.

enter image description here

Spark Streaming和Spark基于RDD概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream,DStream是随时间推移而收到的数据序列,在内部每个时间区间收到数据都作为RDD存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”).

DStream可以从各种输入源创建,比如Flume / Kafka或者HDFS,创建出来DStream支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中,DStream提供了许多与RDD所支持的操作相类似操作支持,还增加了与时间相关新操作,比如滑动窗口.

1.5.1.2 Spark Streaming 特点
1.5.1.3 Spark Streaming 架构

enter image description here

1.5.2 DataStream 入门

1.5.2.1 WordCount 案例
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
package com.geekparkhub.core.spark.application.example
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* StreamWordCounAction
* <p>
*/
object StreamWordCounAction {
def main(args: Array[String]): Unit = {
// 创建 SparkConf
val sc: SparkConf = new SparkConf().setMaster("loacl[*]").setAppName("StreamWordCounAction")
//创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(3))
// 创建 DStream
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("systemhub511", 9999)
// 将行数据转换为单词
val wordDStream: DStream[String] = lineDStream.flatMap(_.split(" "))
// 将单词住转换为元祖
val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
// 统计单词出现个数
val DStreamResult: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_ + _)
// 输出日志信息
DStreamResult.print()
// 启动流式任务
ssc.start()
ssc.awaitTermination()
}
}
[root@systemhub511 ~]# start-cluster.sh
[root@systemhub511 spark]# sbin/start-all.sh
[root@systemhub511 spark]# sbin/start-history-server.sh
[root@systemhub511 spark]# nc -lk 9999
hello
spark
io
io
io
-------------------------------------------
Time: 1559563323000 ms
-------------------------------------------
(hello,1)
-------------------------------------------
Time: 1559563326000 ms
-------------------------------------------
(spark,1)
-------------------------------------------
Time: 1559563329000 ms
-------------------------------------------
Time: 1559563341000 ms
-------------------------------------------
(io,1)
-------------------------------------------
Time: 1559563344000 ms
-------------------------------------------
(io,2)
1.5.2.2 WordCount 解析

Discretized Stream是Spark Streaming基础抽象,代表持续性数据流和经过各种Spark原语操作后的结果数据流,在内部实现上DStream是一系列连续的RDD来表示,每个RDD含有一段时间间隔内的数据.

enter image description here

对数据操作也是按照RDD为单位来进行

enter image description here

计算过程由Spark engine来完成

enter image description here

1.5.3 DataStream 创建

Spark Streaming原生支持一些不同数据源,一些核心数据源已经被打包到Spark Streaming的Maven工件中,而其他一些则可以通过spark-streaming-kafka等附加工件获取,每个接收器都以Spark执行器程序中一个长期运行的任务形式运行,因此会占据分配给应用CPU核心.

此外还需要有可用的CPU核心来处理数据,这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数,例如如果想要在流计算应用中运行10个接收器,那么至少需要为应用分配11个CPU核心,所以如果在本地模式运行,不要使用local或者local[1]

1.5.3.1 文件数据源

文件数据流 : 能够读取所有HDFS API兼容文件系统文件,通过fileStream方法进行读取,Spark Streaming将会监控dataDirectory目录并不断处理移动进来的文件,但是目前不支持嵌套目录.

1.5.3.1.1 用法及说明
streamingContext.textFileStream(dataDirectory)
1.5.3.1.2 案例实操
[root@systemhub511 spark]# hadoop fs -mkdir /core_flow/spark/filestream
[root@systemhub511 filestream]# vim a.txt
[root@systemhub511 filestream]# vim b.txt
[root@systemhub511 filestream]# vim c.txt
package com.geekparkhub.core.spark.application.datastream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* FileStreamAction
* <p>
*/
object FileStreamAction {
def main(args: Array[String]): Unit = {
// 创建 SparkConf
val sc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FileStreamAction")
//创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(3))
// 监控文件夹 DStream
val fileDStream: DStream[String] = ssc.textFileStream("hdfs://systemhub511:9000/core_flow/spark/filestream/")
// 输出日志信息
fileDStream.print()
// 启动流式任务
ssc.start()
ssc.awaitTermination()
}
}
-------------------------------------------
Time: 1559566113000 ms
-------------------------------------------
-------------------------------------------
Time: 1559566116000 ms
-------------------------------------------
-------------------------------------------
Time: 1559566119000 ms
-------------------------------------------
[root@systemhub511 spark]# hadoop fs -put /opt/module/datas/spark_flow/filestream/a.txt /core_flow/spark/filestream/
[root@systemhub511 spark]# hadoop fs -put /opt/module/datas/spark_flow/filestream/b.txt /core_flow/spark/filestream/
[root@systemhub511 spark]# hadoop fs -put /opt/module/datas/spark_flow/filestream/c.txt /core_flow/spark/filestream/
-------------------------------------------
Time: 1559566146000 ms
-------------------------------------------
SparkStreaming
SparkStreaming
SparkStream
DStream
-------------------------------------------
Time: 1559566155000 ms
-------------------------------------------
textFileStream
textFileStream
StreamingContext
-------------------------------------------
Time: 1559566164000 ms
-------------------------------------------
awaitTermination
hadoop
1.5.3.2 RDD 队列
1.5.3.2.1 用法及说明

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理.

1.5.3.2.2 案例实操
package com.geekparkhub.core.spark.application.datastream
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* QueuStreamAction
* <p>
*/
object QueuStreamAction {
def main(args: Array[String]): Unit = {
// 创建 SparkConf
val sc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("QueuStreamAction")
//创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(3))
// 创建RDD队列
val rddQueue = new mutable.Queue[RDD[Int]]()
// 创建 rddDStream
val rddDStream: InputDStream[Int] = ssc.queueStream(rddQueue,false)
// 统计计算
val result: DStream[Int] = rddDStream.reduce(_ + _)
// 输出日志信息
result.print()
// 启动流式任务
ssc.start()
// 循环创建RDD
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 100, 10)
Thread.sleep(2000)
}
ssc.awaitTermination()
}
}
-------------------------------------------
Time: 1559567436000 ms
-------------------------------------------
5050
-------------------------------------------
Time: 1559567439000 ms
-------------------------------------------
10100
-------------------------------------------
Time: 1559567442000 ms
-------------------------------------------
5050
-------------------------------------------
Time: 1559567445000 ms
-------------------------------------------
5050
1.5.3.3 自定义数据源
1.5.3.3.1 用法及说明
1.5.3.3.2 案例实操
package com.geekparkhub.core.spark.application.datastream
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* CustomizeReceiver
* <p>
*/
class CustomizeReceiver(hostName: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
// 开始读取数据
override def onStart(): Unit = {
new Thread("receiver") {
override def run(): Unit = {
receiver()
}
}.start()
}
// 读取数据
def receiver(): Unit = {
try {
// 创建 Socket
val socket = new Socket(hostName, port)
// 定义变量,用来接收端口传过来的数据
var input: String = null
// 创建BufferedReader用于读取端口传来的数据
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
// 赋值
input = reader.readLine()
while (input != null) {
store(input)
input = reader.readLine()
}
// 跳出循环则关闭资源
reader.close()
socket.close()
// 重启流式任务
restart("restart")
} catch {
case e: Exception => restart("restart")
}
}
// 结束读取数据
override def onStop(): Unit = {}
}
package com.geekparkhub.core.spark.application.datastream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* CustomizeReceiverAction
* <p>
*/
object CustomizeReceiverAction {
def main(args: Array[String]): Unit = {
// 创建 SparkConf
val sc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("CustomizeReceiverAction")
//创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(3))
val lineDStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomizeReceiver("systemhub511", 9999))
// 将行数据转换为单词
val wordDStream: DStream[String] = lineDStream.flatMap(_.split(" "))
// 将单词住转换为元祖
val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
// 统计单词出现个数
val DStreamResult: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_ + _)
// 输出日志信息
DStreamResult.print()
// 启动流式任务
ssc.start()
ssc.awaitTermination()
}
}
[root@systemhub511 spark]# nc -lk 9999CustomizeReceiverAction
CustomizeReceiverAction
CustomizeReceiver
CustomizeReceiver
CustomizeReceiv
-------------------------------------------
Time: 1559570220000 ms
-------------------------------------------
Time: 1559570226000 ms
-------------------------------------------
(CustomizeReceiverAction,1)
-------------------------------------------
Time: 1559570229000 ms
-------------------------------------------
(CustomizeReceiverAction,1)
(CustomizeReceiver,1)
-------------------------------------------
Time: 1559570232000 ms
-------------------------------------------
(CustomizeReceiver,1)
-------------------------------------------
Time: 1559570238000 ms
-------------------------------------------
(CustomizeReceiv,1)
1.5.3.4 Kafka数据源
1.5.3.4.1 用法及说明
1.5.3.4.2 案例实操
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.geekparkhub.core.spark</groupId>
<artifactId>spark_server</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>spark-common</module>
<module>spark-core</module>
<module>spark-sql</module>
<module>spark-streaming</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>
</project>
package com.geekparkhub.core.spark.application.datastream
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* KafkaSparkStreamingAction
* <p>
*/
object KafkaSparkStreamingAction {
def main(args: Array[String]): Unit = {
// 创建 SparkConf
val sc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreamingAction")
//创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(3))
// 声明 Kafka参数
val zookeeper = "systemhub511:2181,systemhub611:2181,systemhub711:2181"
val topic = "topic001"
val consumerGroup = "spark"
// 定义 Kafka参数
val kafkaPara: Map[String, String] = Map[String, String](
ConsumerConfig.GROUP_ID_CONFIG -> consumerGroup,
"zookeeper.connect" -> zookeeper,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
)
// 创建 KafkaDStream
val KafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaPara, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
// 输出日志信息
KafkaDStream.print()
// 启动流式任务
ssc.start()
ssc.awaitTermination()
}
}
[root@systemhub511 kafka]# bin/kafka-topics.sh --list --zookeeper systemhub511:2181
__consumer_offsets
ct
topic001
topic002
topic003
topic004
[root@systemhub511 kafka]# bin/kafka-console-producer.sh --broker-list systemhub511:9092 --topic topic001
>
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging
package org.apache.spark
import org.apache.log4j.{LogManager, PropertyConfigurator}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
/** :: DeveloperApi ::
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
* log level is enabled.
*
* NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility.
* This will likely be changed or removed in future releases.
*/
@DeveloperApi
trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
@transient private var log_ : Logger = null
// Method to get the logger name for this object
protected def logName = {
// Ignore trailing $'s in the class names for Scala objects
this.getClass.getName.stripSuffix("$")
}
// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
initializeIfNecessary()
log_ = LoggerFactory.getLogger(logName)
}
log_
}
// Log methods that take only a String
protected def logInfo(msg: => String) {
if (log.isInfoEnabled) log.info(msg)
}
protected def logDebug(msg: => String) {
if (log.isDebugEnabled) log.debug(msg)
}
protected def logTrace(msg: => String) {
if (log.isTraceEnabled) log.trace(msg)
}
protected def logWarning(msg: => String) {
if (log.isWarnEnabled) log.warn(msg)
}
protected def logError(msg: => String) {
if (log.isErrorEnabled) log.error(msg)
}
// Log methods that take Throwables (Exceptions/Errors) too
protected def logInfo(msg: => String, throwable: Throwable) {
if (log.isInfoEnabled) log.info(msg, throwable)
}
protected def logDebug(msg: => String, throwable: Throwable) {
if (log.isDebugEnabled) log.debug(msg, throwable)
}
protected def logTrace(msg: => String, throwable: Throwable) {
if (log.isTraceEnabled) log.trace(msg, throwable)
}
protected def logWarning(msg: => String, throwable: Throwable) {
if (log.isWarnEnabled) log.warn(msg, throwable)
}
protected def logError(msg: => String, throwable: Throwable) {
if (log.isErrorEnabled) log.error(msg, throwable)
}
protected def isTraceEnabled(): Boolean = {
log.isTraceEnabled
}
private def initializeIfNecessary() {
if (!Logging.initialized) {
Logging.initLock.synchronized {
if (!Logging.initialized) {
initializeLogging()
}
}
}
}
private def initializeLogging() {
// Don't use a logger in here, as this is itself occurring during initialization of a logger
// If Log4j 1.2 is being used, but is not initialized, load a default properties file
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
// This distinguishes the log4j 1.2 binding, currently
// org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
// org.apache.logging.slf4j.Log4jLoggerFactory
val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
lazy val isInInterpreter: Boolean = {
try {
val interpClass = classForName("org.apache.spark.repl.Main")
interpClass.getMethod("interp").invoke(null) != null
} catch {
case _: ClassNotFoundException => false
}
}
def classForName(className: String): Class[_] = {
Class.forName(className, true, getContextOrSparkClassLoader)
// scalastyle:on classforname
}
def getContextOrSparkClassLoader: ClassLoader =
Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader)
def getSparkClassLoader: ClassLoader = getClass.getClassLoader
if (usingLog4j12) {
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
if (!log4j12Initialized) {
// scalastyle:off println
if (isInInterpreter) {
val replDefaultLogProps = "org/apache/spark/log4j-defaults-repl.properties"
Option(Utils.getSparkClassLoader.getResource(replDefaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
System.err.println(s"Using Spark's repl log4j profile: $replDefaultLogProps")
System.err.println("To adjust logging level use sc.setLogLevel(\"INFO\")")
case None =>
System.err.println(s"Spark was unable to load $replDefaultLogProps")
}
} else {
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}
// scalastyle:on println
}
}
Logging.initialized = true
// Force a call into slf4j to initialize it. Avoids this happening from multiple threads
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
log
}
}
private object Logging {
@volatile private var initialized = false
val initLock = new Object()
try {
// We use reflection here to handle the case where users remove the
// slf4j-to-jul bridge order to route their logs to JUL.
val bridgeClass = Utils.classForName("org.slf4j.bridge.SLF4JBridgeHandler")
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
if (!installed) {
bridgeClass.getMethod("install").invoke(null)
}
} catch {
case e: ClassNotFoundException => // can't log anything yet so just fail silently
}
}
-------------------------------------------
Time: 1559624490000 ms
-------------------------------------------
[root@systemhub511 kafka]# bin/kafka-console-producer.sh --broker-list systemhub511:9092 --topic topic001
>top001
>top002
>top003
>top004
>top005
>top006
-------------------------------------------
Time: 1559624499000 ms
-------------------------------------------
(null,top001)
-------------------------------------------
Time: 1559624502000 ms
-------------------------------------------
(null,top002)
-------------------------------------------
Time: 1559624505000 ms
-------------------------------------------
(null,top003)
-------------------------------------------
Time: 1559624508000 ms
-------------------------------------------
(null,top004)
-------------------------------------------
Time: 1559624511000 ms
-------------------------------------------
(null,top005)
-------------------------------------------
Time: 1559624514000 ms
-------------------------------------------
(null,top006)

1.5.4 DataStream 转换

1.5.4.1 无状态转化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD,部分无状态转化操作列在了下表中,注意针对键值对的DStream转化操作(比如reduceByKey())要添加import StreamingContext._才能在Scala中使用.

函数名称 作用 Scala实例 用来操作DStream[T]用户自定义函数 函数签名
map() 对DStream中的每个元素应用到给定函数,返回由各个元素输出的元素的DStream. ds.map(x => x + 1) f:(T) -> U
flatMap() 对DStream中的每个元素应用给定函数,返回有各个元素输出迭代器组成的DStream. ds.flatMap(x => x.split(” “)) f: T -> Iterable[U]
filter() 返回由给定DStream中通过筛选的元素组成的DStream. ds.filter(x => x != 1) f: T -> Boolean
repartition() 改变DStream分区数 ds.repartition(10) N/A
reduceByKey() 将每个批次中间相同的记录规约 ds.reduceByKey((x,y) => x + y) f:T , T -> T
groupByKey() 将每个批次中的记录根据键分组 ds.groupByKey() N/A

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的,例如reduceByKey()会归约每个时间区间中数据,但不会归约不同区间之间数据.

举个例子,在之前的wordcount程序中,只会统计1秒内接收到的数据单词个数,而不会累加.

无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间区间内,例如键值对DStream拥有和RDD一样的与连接相关的转化操作,也就是cogroup() / join() / leftOuterJoin()等,可以在DStream上使用这些操作,这样就对每个批次分别执行了对应的RDD操作.
还可以像在常规的Spark中一样使用DStream的union()操作将它和另一个DStream 的内容合并起来,也可以使用StreamingContext.union()来合并多个流.

1.5.4.2 有状态转化操作
1.5.4.2.1 UpdateStateByKey

UpdateStateByKey原语用于记录历史记录,有时需要在DStream中跨批次维护状态(例如流计算中累加wordcount),针对这种情况,updateStateByKey()提供了对一个状态变量的访问,用于键值对形式DStream,给定一个由(键,事件)对构成DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的DStream,其内部数据为(键,状态) 对.

updateStateByKey()结果会是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成.

updateStateByKey操作可以在用新信息进行更新时保持任意状态,为使用这个功能,只需要做下面两步 :

  1. 定义状态,状态可以是一个任意数据类型.
    2.定义状态更新函数,用此函数阐明如何使用之前状态和来自输入流新值对状态进行更新.
    使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态.
    更新版的wordcount

package com.geekparkhub.core.spark.application.example
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* UpdateStateByKeyWordCounAction
* <p>
*/
object UpdateStateByKeyWordCounAction {
def main(args: Array[String]): Unit = {
// 创建 SparkConf
val sc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UpdateStateByKeyWordCounAction")
//创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(3))
// 创建缓存目录检查站
ssc.checkpoint("./ck")
// 创建 DStream
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("systemhub511", 9999)
// 将行数据转换为单词
val wordDStream: DStream[String] = lineDStream.flatMap(_.split(" "))
// 将单词住转换为元祖
val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
/**
* 定义更新状态方法
* 参数 values为当前批次单词频度
* 参数 state为以往批次单词频度
*/
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val count: Int = values.sum
val perCount: Int = state.getOrElse(0)
Some(count + perCount)
}
// 统计单词出现个数
val DStreamResult: DStream[(String, Int)] = wordAndOneDStream.updateStateByKey(updateFunc)
// 输出日志信息
DStreamResult.print()
// 启动流式任务
ssc.start()
ssc.awaitTermination()
}
}
[root@systemhub511 spark]# nc -kl 9999
hello
hello
hello
geek
geek
hello
hey
hey
test
test
hello
-------------------------------------------
Time: 1559643684000 ms
-------------------------------------------
(hello,1)
-------------------------------------------
Time: 1559643687000 ms
-------------------------------------------
(hello,1)
-------------------------------------------
Time: 1559643690000 ms
-------------------------------------------
(hello,3)
-------------------------------------------
Time: 1559643693000 ms
-------------------------------------------
(hello,3)
(geek,1)
-------------------------------------------
Time: 1559643696000 ms
-------------------------------------------
(hello,3)
(geek,2)
-------------------------------------------
Time: 1559643699000 ms
-------------------------------------------
(hello,4)
(geek,2)
-------------------------------------------
Time: 1559643702000 ms
-------------------------------------------
(hello,4)
(geek,2)
(hey,1)
-------------------------------------------
Time: 1559643705000 ms
-------------------------------------------
(hello,4)
(geek,2)
(hey,2)
-------------------------------------------
Time: 1559643708000 ms
-------------------------------------------
(hello,4)
(test,2)
(geek,2)
(hey,2)
-------------------------------------------
Time: 1559643711000 ms
-------------------------------------------
(hello,5)
(test,2)
(geek,2)
(hey,2)

🔒 尚未解锁 正在探索中… 尽情期待 Blog更新! 🔒

1.5.4.2.2 Window Operations
1.5.4.3 其他重要操作
1.5.4.3.1 Transform
1.5.4.3.2 Join

1.5.5 DataStream 输出

1.5.5 DataStream Program

🔥 2. Spark 高阶 🔥

2.1 内核机制

2.1 性能调优

3. 修仙之道 技术架构迭代 登峰造极之势

Alt text


💡如何对该开源文档进行贡献💡

  1. Blog内容大多是手敲,所以难免会有笔误,你可以帮我找错别字。

  2. 很多知识点我可能没有涉及到,所以你可以对其他知识点进行补充。

  3. 现有的知识点难免存在不完善或者错误,所以你可以对已有知识点的修改/补充。

  4. 💡欢迎贡献各领域开源野生Blog&笔记&文章&片段&分享&创想&OpenSource Project&Code&Code Review

  5. 🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈 issues: geekparkhub.github.io/issues 🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈

希望每一篇文章都能够对读者们提供帮助与提升,这乃是每一位笔者的初衷


💌感谢您的阅读 欢迎您的留言与建议💌

捐助 项目的发展离不开你的支持,请开发者喝杯☕Coffee☕吧!

enter image description here

致谢

捐助时请备注 UserName

ID UserName Donation Money Consume
1 Object WeChatPay 5RMB 一杯可乐
2 泰迪熊看月亮 AliPay 20RMB 一杯咖啡
3 修仙道长 WeChatPay 10RMB 两杯可乐

License 开源协议

Apache License Version 2.0